We can combine map and reduce operations to perform more complex operations.
Suppose we want to compute the sum of the squares $$ \sum_{i=1}^n x_i^2 $$ where the elements $x_i$ are stored in an RDD.
B=sc.parallelize(range(4))
B.collect()
[0, 1, 2, 3]
Perform assignment after each computation
Squares=B.map(lambda x:x*x)
Squares.reduce(lambda x,y:x+y)
14
Combine computations into a single cascaded command
B.map(lambda x:x*x)\
.reduce(lambda x,y:x+y)
14
The only difference:
Squares
The execution is identical!
The standard way that the map and reduce are executed is
Squares
) requires memory space.B
, then of Squares
) - double the cache-misses.Perform the whole computation in a single pass. For each element of B
reduce
operation.This type of pipelined evaluation is related to Lazy Evaluation. The word Lazy is used because the first command (computing the square) is not executed immediately. Instead, the execution is delayed as long as possible so that several commands are executed in a single pass.
The delayed commands are organized in an Execution plan
Here is another way to compute the sum of the squares using a single reduce command. What is wrong with it?
C=sc.parallelize([1,1,1])
C.reduce(lambda x,y: x*x+y*y)
5
RDD's typically have hundreds of thousands of elements. It usually makes no sense to print out the content of a whole RDD. Here are some ways to get manageable amounts of information about an RDD
n=1000000
B=sc.parallelize([0,0,1,0]*(n/4))
#find the number of elements in the RDD
B.count()
1000000
# get the first few elements of an RDD
print 'first element=',B.first()
print 'first 5 elements = ',B.take(5)
first element= 0 first 5 elements = [0, 0, 1, 0, 0]
# get a sample whose expected size is m
m=5.
B.sample(False,m/n).collect()
[1, 0, 1, 0, 0, 0]
The method RDD.filter(func)
Return a new dataset formed by selecting those elements of the source on which func returns true.
# How many positive numbers?
B.filter(lambda n: n > 0).count()
250000
The method RDD.distinct(numPartitions=None)
Returns a new dataset that contains the distinct elements of the source dataset
# Remove duplicate element in DuplicateRDD, we get distinct RDD
DuplicateRDD = sc.parallelize([1,1,2,2,3,3])
DistinctRDD = DuplicateRDD.distinct()
DistinctRDD.collect()
[1, 2, 3]
The method RDD.flatMap(func)
is similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
text=["you are my sunshine","my only sunshine"]
text_file = sc.parallelize(text)
# map each line in text to a list of words
print 'map:',text_file.map(lambda line: line.split(" ")).collect()
# create a single list of words by combining the words from all of the lines
print 'flatmap:',text_file.flatMap(lambda line: line.split(" ")).collect()
map: [['you', 'are', 'my', 'sunshine'], ['my', 'only', 'sunshine']] flatmap: ['you', 'are', 'my', 'sunshine', 'my', 'only', 'sunshine']
In this part, we explore set operations including union,intersection,subtract, cartesian in pyspark
rdd1 = sc.parallelize([1, 1, 2, 3])
rdd2 = sc.parallelize([1, 3, 4, 5])
rdd1.union(rdd2).collect()
[1, 1, 2, 3, 1, 3, 4, 5]
rdd1.intersection(rdd2).collect()
[1, 3]
rdd1.subtract(rdd2).collect()
[2]
print rdd1.cartesian(rdd2).collect()
[(1, 1), (1, 3), (1, 4), (1, 5), (1, 1), (1, 3), (1, 4), (1, 5), (2, 1), (2, 3), (2, 4), (2, 5), (3, 1), (3, 3), (3, 4), (3, 5)]